package com.duobaoyu.chatwebsocket.config;

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PropertyValueConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.duobaoyu.chatwebsocket.constant.MqConfigConstant;
import com.duobaoyu.chatwebsocket.constant.MqTagConstant;
import com.duobaoyu.chatwebsocket.listener.WeChatToEmpListener;
import com.duobaoyu.chatwebsocket.util.NetUtil;
import lombok.Getter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;

import javax.annotation.Resource;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 消息队列配置
 *
 * @author ningchang
 */
@Configuration
@Getter
@Order(value = 1)
public class MqConfig {

    @Resource
    private MqConfigConstant mqConfigConstant;

    @Resource
    private WeChatToEmpListener weChatToEmpListener;

    private Properties getMqProperties() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, mqConfigConstant.getAccessKey());
        properties.setProperty(PropertyKeyConst.SecretKey, mqConfigConstant.getSecretKey());
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, mqConfigConstant.getNameSrvAddr());
        return properties;
    }

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean producerBean() {
        ProducerBean producerBean = new ProducerBean();
        producerBean.setProperties(this.getMqProperties());
        return producerBean;
    }


    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean consumerBean() {
        ConsumerBean consumerBean = new ConsumerBean();
        Properties properties = this.getMqProperties();
        properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfigConstant.getGroupId());
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "50");
        consumerBean.setProperties(properties);

        /*订阅关系*/
        Map<Subscription, MessageListener> subscriptionTable = new ConcurrentHashMap<>(16);

        //业务层发往socket
        Subscription weChatToEmp = new Subscription();
        weChatToEmp.setTopic(mqConfigConstant.getConsumeTopic());
        weChatToEmp.setExpression(MqTagConstant.TO_SOCKET_BROADCASTING);
        subscriptionTable.put(weChatToEmp, weChatToEmpListener);

        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }

}
